package cKafka

import (
	"sync"
	"time"

	"github.com/IBM/sarama"

	"gitee.com/csingo/cContext"
)

type kafkaClient struct {
	id         string
	createTime int64
	client     sarama.Client
	operators  map[string]KafkaOperatorInterface
}

type kafkaContainer struct {
	lock *sync.Mutex
	// clients         map[string]sarama.Client
	// clientOperators map[string]map[string]KafkaOperatorInterface
	clients     map[string]*kafkaClient
	healthcheck int64
}

func (i *kafkaContainer) saveClient(clientId string, client sarama.Client) {
	i.lock.Lock()
	defer i.lock.Unlock()

	if _, ok := i.clients[clientId]; !ok {
		i.clients[clientId] = &kafkaClient{
			id:         clientId,
			createTime: time.Now().UnixNano(),
			client:     client,
			operators:  map[string]KafkaOperatorInterface{},
		}
		return
	}

	ctx := cContext.New()
	if i.clients[clientId].client.Closed() {
		for k, operator := range i.clients[clientId].operators {
			operator.Delete()
			operator.Close(ctx)
			delete(i.clients[clientId].operators, k)
		}
		delete(i.clients, clientId)
	}

	i.clients[clientId] = &kafkaClient{
		id:         clientId,
		createTime: time.Now().UnixNano(),
		client:     client,
		operators:  map[string]KafkaOperatorInterface{},
	}
}

func (i *kafkaContainer) getClient(clientId string) (client *kafkaClient) {
	i.lock.Lock()
	defer i.lock.Unlock()

	client = i.clients[clientId]
	if client == nil || client.client.Closed() {
		return nil
	}

	return client
}

func (i *kafkaContainer) saveOperator(clientId, operatorId string, operator KafkaOperatorInterface) {
	i.lock.Lock()
	defer i.lock.Unlock()

	ctx := cContext.New()
	if _, ok := i.clients[clientId]; !ok {
		return
	}

	if _, ok := i.clients[clientId].operators[operatorId]; ok {
		i.clients[clientId].operators[operatorId].Delete()
		i.clients[clientId].operators[operatorId].Close(ctx)
	}

	i.clients[clientId].operators[operatorId] = operator
}

func (i *kafkaContainer) getOperator(clientId, operatorId string) (operator KafkaOperatorInterface) {
	i.lock.Lock()
	defer i.lock.Unlock()

	if _, ok := i.clients[clientId]; !ok {
		return
	}

	return i.clients[clientId].operators[operatorId]
}

func (i *kafkaContainer) getOperatorIDs(clientId string) (operatorIds []string) {
	i.lock.Lock()
	defer i.lock.Unlock()

	operatorIds = []string{}

	if _, ok := i.clients[clientId]; !ok {
		return
	}

	if i.clients[clientId].operators == nil {
		return
	}

	for id, _ := range i.clients[clientId].operators {
		operatorIds = append(operatorIds, id)
	}

	return
}

func (i *kafkaContainer) deleteOperator(clientId, operatorId string) {
	i.lock.Lock()
	defer i.lock.Unlock()

	if _, ok := i.clients[clientId]; !ok {
		return
	}

	if i.clients[clientId].operators == nil {
		return
	}

	delete(i.clients[clientId].operators, operatorId)
}

func (i *kafkaContainer) GetAllClientIDs() (clientIds []string) {
	i.lock.Lock()
	defer i.lock.Unlock()

	clientIds = []string{}

	for k, _ := range i.clients {
		clientIds = append(clientIds, k)
	}

	return
}

var container = &kafkaContainer{
	lock:        &sync.Mutex{},
	clients:     map[string]*kafkaClient{},
	healthcheck: 0,
}
